#include "StdAfx.h"
#include <winsock2.h>
#include <event.h>
//#include <evhttp.h>
#include <stdint.h>
#include <fcntl.h>
#include <event2/event.h>
#include <event2/buffer.h>
#include <event2/bufferevent.h>
#include <assert.h>
#include <string.h>
#include <stdlib.h>
#include <stdio.h>
#include <errno.h>
#include <io.h>
#include "IOManager.h"
#include "SocketStreamInput.h"
#include "BufferConstraint.h"

#ifdef   FD_SETSIZE 
#undef   FD_SETSIZE 
#endif
#define   FD_SETSIZE   2048  
#pragma comment(lib, "ws2_32.lib")
#pragma comment(lib, "wsock32.lib")
#pragma comment(lib, "libevent.lib")
#pragma comment(lib, "libevent_core.lib")
#pragma comment(lib, "libevent_extras.lib")





////////////////////stream input logic//////////////
void input_read(struct bufferevent *bev, void *ctx)
{
	SocketStreamInput* streamInput = (SocketStreamInput*)ctx;
	struct evbuffer *input;

	input = bufferevent_get_input(bev);

	size_t buffer_len = evbuffer_get_length(input);

	char *record = new char [buffer_len];
	if (buffer_len <= 0)
		return ;
	evbuffer_remove(input,record,buffer_len);

	streamInput->readData(record, buffer_len);
	delete[]record;
}






void errorcb(struct bufferevent *bev, short error, void *ctx)//error handler
{
	if (error & BEV_EVENT_EOF) {
		/* connection has been closed, do any clean up here */
		/* ... */
	} else if (error & BEV_EVENT_ERROR) {
		/* check errno to see what error occurred */
		/* ... */
	} else if (error & BEV_EVENT_TIMEOUT) {
		/* must be a timeout event handle, handle it */
		/* ... */
	}
	bufferevent_free(bev);
}

void input_accept(evutil_socket_t listener, short event, void *arg)//listen socket callback function
{

	struct event_base *base = IOManager::getInstance()->base;
	struct sockaddr_storage ss;
	socklen_t slen = sizeof(ss);
	int fd = accept(listener, (struct sockaddr*)&ss, &slen);
	if (fd < 0) {
		perror("accept");
	} else if (fd > FD_SETSIZE) {
		close(fd);
	} else {
		struct bufferevent *bev;  
		evutil_make_socket_nonblocking(fd);
		bev = bufferevent_socket_new(base, fd, BEV_OPT_CLOSE_ON_FREE); 

		bufferevent_setcb(bev, input_read, NULL, errorcb, arg); 

		bufferevent_setwatermark(bev, EV_READ, 0, PAGE_SIZE); 
		bufferevent_enable(bev, EV_READ|EV_WRITE); 
	}
}

////////////////////stream output logic//////////////
void output_connected(struct bufferevent *bev, short events, void *ptr)
{
	struct evbuffer  *output;
	bufferevent_setwatermark(bev, EV_WRITE, 0, PAGE_SIZE); 
	output = bufferevent_get_output(bev);
	struct event_base *base = IOManager::getInstance()->base;

	if (events & BEV_EVENT_CONNECTED) {
		IOManager::getInstance()->outputMap.insert(make_pair((SocketStreamOutput*)ptr,output));
		// evbuffer_add(output, a, 4);
	} else if (events & BEV_EVENT_ERROR) {
		/* An error occured while connecting. */
	}

}
bool IOManager::writeOutput(char* buffer, int length,SocketStreamOutput* streamOutput)
{
	std::map<SocketStreamOutput*, struct evbuffer*>::iterator it;
	it = this->outputMap.find(streamOutput);
	if(it==outputMap.end())
	{
		//assert(false);
		addStreamOutput(streamOutput->ip,streamOutput->port,streamOutput);
		return false;
	}
	struct evbuffer  *output = it->second;
	evbuffer_add(output, buffer, length);
	return true;
}
////////////////////command input logic//////////////
void command_read(struct bufferevent *bev, void *ctx)
{
	CommandManager * commandManager = (CommandManager*)ctx;
	struct evbuffer *input,*output;

	input = bufferevent_get_input(bev);
	output = bufferevent_get_output(bev);
	
	size_t buffer_len = evbuffer_get_length(input);

	char *record = new char [buffer_len];
	if (buffer_len <= 0)
		return ;
	evbuffer_remove(input,record,buffer_len);

	bool bl = commandManager->processCommand(record, buffer_len);
	*(bool*)record = bl;
	evbuffer_add(output, record, 1);
	delete[]record;
}
void command_accept(evutil_socket_t listener, short event, void *arg)//listen socket callback function
{

	struct event_base *base = IOManager::getInstance()->base;
	struct sockaddr_storage ss;
	socklen_t slen = sizeof(ss);
	int fd = accept(listener, (struct sockaddr*)&ss, &slen);
	if (fd < 0) {
		perror("accept");
	} else if (fd > FD_SETSIZE) {
		close(fd);
	} else {
		struct bufferevent *bev;  
		evutil_make_socket_nonblocking(fd);
		bev = bufferevent_socket_new(base, fd, BEV_OPT_CLOSE_ON_FREE); 

		bufferevent_setcb(bev, command_read, NULL, errorcb, arg); 

		bufferevent_setwatermark(bev, EV_READ, 0, PAGE_SIZE); 
		bufferevent_enable(bev, EV_READ|EV_WRITE); 
	}
}


IOManager * IOManager::ioManager = NULL;
IOManager::IOManager(void)
{
	WSADATA wsaData;

	WORD sockVersion = MAKEWORD(2, 0);
	WSAStartup(sockVersion, &wsaData);
	this->base = event_base_new();  
	if (!this->base)
		return; 
}



IOManager::~IOManager(void)
{
	WSACleanup();
}
IOManager* IOManager::getInstance(void)
{
	if(ioManager==NULL)
	{
		ioManager = new IOManager();
	}
	return ioManager;
}
void IOManager::executeOnce()
{
	event_base_loop(this->base,EVLOOP_NONBLOCK);


}
void IOManager::addStreamInput(std::string ip, std::string port, SocketStreamInput* streamInput)
{


	int iport = atoi(port.c_str());
	evutil_socket_t listener;
	struct sockaddr_in my_addr;   
	struct event *listener_event;
	my_addr.sin_family = AF_INET;
	my_addr.sin_port = htons(iport);
	my_addr.sin_addr.s_addr = INADDR_ANY;
	listener = socket(AF_INET, SOCK_STREAM, 0 );
	evutil_make_socket_nonblocking(listener);
#ifndef WIN32
	{
		int one = 1;
		setsockopt(listener, SOL_SOCKET, SO_REUSEADDR, &one, sizeof(one));
	}
#endif
	if(::bind(listener, (struct sockaddr*)&my_addr, sizeof(my_addr))==SOCKET_ERROR)
	{
		perror("bind");
		return;
	};

	if (listen(listener, 16)<0) {  
		perror("listen");
		return;
	}


	listener_event = event_new(base, listener, EV_READ|EV_PERSIST, input_accept, (void*)streamInput);  
	/*XXX check it */
	event_add(listener_event, NULL); 

}


bool IOManager::addStreamOutput(std::string ip, std::string port, SocketStreamOutput* streamOutput)
{

	int iport = atoi(port.c_str());
	struct event_base *base = this->base;
	struct bufferevent *bev;
	struct sockaddr_in sin;

	//base = event_base_new();

	memset(&sin, 0, sizeof(sin));
	sin.sin_family = AF_INET;
	sin.sin_addr.s_addr = htonl(0x7f000001); /* 127.0.0.1 */
	sin.sin_port = htons(iport); /* Port 8080 */	

	bev = bufferevent_socket_new(base, -1, BEV_OPT_CLOSE_ON_FREE);

	bufferevent_setcb(bev, NULL, NULL, output_connected, (void*)streamOutput);
	//bufferevent_setcb(bev, NULL, NULL, output_connected, (void*)streamOutput); 

	if (bufferevent_socket_connect(bev,
		(struct sockaddr *)&sin, sizeof(sin)) < 0) {
			/* Error starting connection */
			bufferevent_free(bev);
			return false;
	}

	//event_base_dispatch(base);
	return true;
}

void IOManager::addCommandInput(std::string ip,std::string port, CommandManager* commandManager)
{
	

	int iport = atoi(port.c_str());
	evutil_socket_t listener;
	struct sockaddr_in my_addr;   
	struct event *listener_event;
	my_addr.sin_family = AF_INET;
	my_addr.sin_port = htons(iport);
	my_addr.sin_addr.s_addr = INADDR_ANY;
	listener = socket(AF_INET, SOCK_STREAM, 0 );
	evutil_make_socket_nonblocking(listener);
#ifndef WIN32
	{
		int one = 1;
		setsockopt(listener, SOL_SOCKET, SO_REUSEADDR, &one, sizeof(one));
	}
#endif
	if(::bind(listener, (struct sockaddr*)&my_addr, sizeof(my_addr))==SOCKET_ERROR)
	{
		perror("bind");
		return;
	};

	if (listen(listener, 16)<0) {  
		perror("listen");
		return;
	}
	 

	listener_event = event_new(base, listener, EV_READ|EV_PERSIST, command_accept, (void*)commandManager);  
 
	event_add(listener_event, NULL); 
}

